In [1]:
# Imports relativos ao sistema operacional
import os
import sys
In [2]:
# Passagem de caminhos do Spark e do diretório de dados
SPARK_PATH = "/Users/flavio.clesio/Documents/spark-2.1.0"
ROOT_DIR = "/Users/flavio.clesio/Downloads/machine-learning-with-spark-code/data/bike-sharing/Bike-Sharing-Dataset"
In [3]:
# Neste snippet passamos os caminhos do Spark como variaveis de ambiente para o sistema operacional
os.environ['SPARK_HOME'] = SPARK_PATH
os.environ['HADOOP_HOME'] = SPARK_PATH
# Neste snippet passamos todas as partes da instalacao do Spark
sys.path.append(SPARK_PATH + "/bin")
sys.path.append(SPARK_PATH + "/python")
sys.path.append(SPARK_PATH + "/python/pyspark/")
sys.path.append(SPARK_PATH + "/python/lib")
sys.path.append(SPARK_PATH + "/python/lib/pyspark.zip")
sys.path.append(SPARK_PATH + "/python/lib/py4j-0.10.4-src.zip") # Must be the same version of your Spark Version
In [154]:
%matplotlib inline
In [155]:
# Vamos fazer agora alguns imports iniciais em relação ao Spark
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import LinearRegressionWithSGD
from pyspark.mllib.tree import DecisionTree
import matplotlib.pyplot
import numpy as np
import matplotlib
matplotlib.use('Agg')
In [156]:
%pylab inline
pylab.rcParams['figure.figsize'] = (14, 9)
In [5]:
# Instanciamento da sessao do Spark
sc = SparkContext("local", "app-regressao-pyspark")
sc
Out[5]:
Paper: Fanaee-T, Hadi and Gama Joao, Event labeling combining ensemble detectors and background knowledge, Progress in Artificial Intelligence, pp.1-15, Springer Berlin Heidelberg, 2013.
Paper original: http://link.springer.com/article/10.1007%2Fs13748-013-0040-3.
Descricao das variaveis:
• instant: Chave da transacao
• dteday: Data
• season: Estacao do ano
• yr: Ano
• mnth: Mes do ano
• hr: Hora do ano
• holiday: Se e feriado ou nao
• weekday: Qual o dia da semana
• workingday: Se e dia util ou nao
• weathersit: Descricao do tempo
• temp: Temperatura normalizada
• atemp: Temperatura aparente
• hum: Umidade aparente
• windspeed: Velocidade do vento normalizada
• cnt: Variavel target com o numero de bikes alugadas na respectiva hora
In [12]:
# Carga do arquivo .csv
raw_data = sc.textFile(ROOT_DIR + "/hour_noheader.csv")
In [18]:
# Contagem simples em relacao ao numero de registros
num_data = raw_data.count()
print 'Quantidade de registros:', num_data
In [14]:
# Transformacao do arquivo em um RDD realizando a divisao pela virgula
records = raw_data.map(lambda x: x.split(","))
In [15]:
# Primeiro registro ja sem os headers
first = records.first()
print first
In [16]:
# Vamos colocar os dados em cache, ja que vamos realizar inumeras leituras
records.cache()
Out[16]:
In [19]:
# A funcao abaixo ira realizar o map no rdd passado como parametro, e aplicando a funcao lambda nos campos pegando
# cada valor do seu indice fields[idx] e nao somente isso ira pegar os registros distintos de cada campo .distinct()
# e cada elmento sera indexado de acordo com a sua aparicao usando o .zipWithIndex() e por no final o .collectAsMap()
# vai retornar o valor pareado via chave-valor
def get_mapping(rdd, idx):
return rdd.map(lambda fields: fields[idx]).distinct().zipWithIndex().collectAsMap()
In [27]:
print "Mapeamento da primeira coluna categorica: %s" % get_mapping(records, 2)
In [ ]:
# Agora o mapeamento sera feito para as variaveis da 2 ate a 10 de forma incremental e sera armazenada no objeto
# mappings
mappings = [get_mapping(records, i) for i in range(2,10)]
In [30]:
# Mapeamentos para cada uma das variaveis (Por ordem)
mappings
Out[30]:
In [32]:
# Aqui vamos fazer a soma de quantas categorias diferentes apareceram
cat_len = sum(map(len, mappings))
print 'Numero de categorias distintas das variaveis categoricas:', cat_len
In [38]:
# Primeiro registro das colunas 11 até a 15
records.first()[11:15]
Out[38]:
In [39]:
# Primeiro registro do dataset records
records.first()
Out[39]:
In [40]:
# Tamanho das variaveis numericas
num_len = len(records.first()[11:15])
print 'Quantidade de variaveis numericas:', num_len
In [41]:
total_len = num_len + cat_len
In [45]:
print "Tamanho do vetor de features (Feature vector) das variaveis categoricas: %d" % cat_len
In [46]:
print "Tamanho do vetor de features (Feature vector) das variaveis numericas: %d" % num_len
In [47]:
print "Tamanho total do vetor de features: %d" % total_len
No caso vamos pegar os mappings que foram extraidos anteriormente das variaveis categoricas, e vamos transformar em variaveis binarias.
No caso a funcao abaixo vai aplicar esse binary-encoding para todas as instancias da base de dados.
Tambem sera criada uma funcao para extrair a variavel target de cada registro.
O import da classe LabeledPoint vai construir os vetores de features e as variaveis target.
In [49]:
# A funcao passa por cada coluna do dataset e faz a extracao do encoding binario de cada uma das variaveis. A variavei
# step garante que uma feature nao-zero sera acrescida ao vetor de features. A vetor numerico sera criado primeiramente
# convertendo os dados para ponto flutuante e guardar esse numero em um array do numpy
def extract_features(record):
cat_vec = np.zeros(cat_len)
i = 0
step = 0
for field in record[2:9]:
m = mappings[i]
idx = m[field]
cat_vec[idx + step] = 1
i = i + 1
step = step + len(m)
num_vec = np.array([float(field) for field in record[10:14]])
return np.concatenate((cat_vec, num_vec))
In [50]:
# Essa funcao simplesmente converte a ultima coluna da variavel (um count) em um float
def extract_label(record):
return float(record[-1])
In [52]:
# Usando o metodo .map vamos passar as duas funcoes no dataset records para gerar a base data
data = records.map(lambda r: LabeledPoint(extract_label(r), extract_features(r)))
In [55]:
data.take(3)
Out[55]:
In [56]:
first_point = data.first()
In [57]:
print "Raw data: " + str(first[2:])
In [58]:
print "Label: " + str(first_point.label)
In [59]:
print "Linear Model feature vector:\n" + str(first_point.features)
In [60]:
print "Linear Model feature vector length: " + str(len(first_point.features))
In [61]:
# Funcao para extracao de features para o modelo de arvore de decisao regressora
def extract_features_dt(record):
return np.array(map(float, record[2:14]))
In [62]:
# Transformacao em labelpoint realizando a extracao do label e das features
data_dt = records.map(lambda r: LabeledPoint(extract_label(r),extract_features_dt(r)))
In [69]:
first_point_dt = data_dt.first()
In [70]:
print "Decision Tree feature vector: " + str(first_point_dt.features)
In [71]:
print "Decision Tree feature vector length: " + str(len(first_point_dt.features))
In [67]:
# Paarmetros do modelo linear com SGD
help(LinearRegressionWithSGD.train)
In [68]:
# Paarmetros do modelo linear com arvore de decisao regressora
help(DecisionTree.trainRegressor)
In [74]:
linear_model = LinearRegressionWithSGD.train(data
,iterations=10
,step=0.1
,intercept=False)
In [75]:
# Aqui vamos usar o .map para o nosso conjunto de dados data, e usando a funcao lambda vamos pegar a coluna com os valores
# que no caso é o p.label e o resultado das predicoes no .peatures
true_vs_predicted = data.map(lambda p: (p.label, linear_model.predict(p.features)))
In [76]:
print "Linear Model predictions: " + str(true_vs_predicted.take(5))
In [77]:
# Agora vamos usar o regressor da arvore de decisao regressora. No caso quando se a algum tipo de variavel categorica
# e necessario passar como argumento categoricalFeaturesInfo. No caso vai ficar as...is
dt_model = DecisionTree.trainRegressor(data_dt
,{})
# A diferenca fundamental que precisa ser enendida e que no modelo linear, obrigatoriamente as variavels dummy precisam
# estar criadas no vetor de features, enquanto no modelo de arvore de decisao, como o algoritmo tem propriedades
# de quebra da arvore, nem sempre isso e necessario, mas e recomendado
In [78]:
preds = dt_model.predict(data_dt.map(lambda p: p.features))
In [79]:
actual = data.map(lambda p: p.label)
In [80]:
true_vs_predicted_dt = actual.zip(preds)
In [81]:
print "Decision Tree predictions: " + str(true_vs_predicted_dt.take(5))
In [82]:
print "Decision Tree depth: " + str(dt_model.depth())
In [83]:
print "Decision Tree number of nodes: " + str(dt_model.numNodes())
In [90]:
# Para a avaliacao dos modelos regresores, geralmente usa-se as medidas de erro MSE (Mean Square Error),
# MAE (Mean Absolute Error), RMSE (Root Mean Square Error) em que RMSLE (Root Mean Squared Log Error):
# MSE: Media do erro quadratico que e usado na funcao de perda da regressao. Penaliza de forma mais agressiva os erros mais extremos
# RMSE: Raiz quadrada do MSE. E medido na mesma escala da variavel
# MAE: Media da diferenca absoluta entre as predicoes
# RMSLE: E a transformacao do RMSE e deve ser usada quando o range de predicao for grande, e nao se quer penalizar
# os grandes erros. Isso e efetivo quando se quer ter o maximo de acertos em % ao inves da diferenca dos erros em si
def squared_error(actual, pred):
return (pred - actual)**2
def abs_error(actual, pred):
return np.abs(pred - actual)
def squared_log_error(pred, actual):
return (np.log(pred + 1) - np.log(actual + 1))**2
In [91]:
mse = true_vs_predicted.map(lambda (t, p): squared_error(t, p)).mean()
mae = true_vs_predicted.map(lambda (t, p): abs_error(t, p)).mean()
rmsle = np.sqrt(true_vs_predicted.map(lambda (t, p): squared_log_error(t, p)).mean())
In [92]:
print "Linear Model - Mean Squared Error: %2.4f" % mse
print "Linear Model - Mean Absolute Error: %2.4f" % mae
print "Linear Model - Root Mean Squared Log Error: %2.4f" % rmsle
In [93]:
mse_dt = true_vs_predicted_dt.map(lambda (t, p): squared_error(t, p)).mean()
mae_dt = true_vs_predicted_dt.map(lambda (t, p): abs_error(t, p)).mean()
rmsle_dt = np.sqrt(true_vs_predicted_dt.map(lambda (t, p): squared_log_error(t, p)).mean())
In [94]:
print "Decision Tree - Mean Squared Error: %2.4f" % mse_dt
print "Decision Tree - Mean Absolute Error: %2.4f" % mae_dt
print "Decision Tree - Root Mean Squared Log Error: %2.4f" % rmsle_dt
In [99]:
import pylab
%pylab
In [100]:
targets = records.map(lambda r: float(r[-1])).collect()
In [104]:
hist(targets, bins=40, color='lightblue', normed=True)
Out[104]:
In [105]:
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(16, 10)
In [106]:
# Devido ao fato de que a distribuicao nao e uniformemente distribuida, no caso temos que transformar os dados para
# logaritmo
log_targets = records.map(lambda r: np.log(float(r[-1]))).collect()
In [107]:
hist(log_targets, bins=40, color='lightblue', normed=True)
Out[107]:
In [108]:
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(16, 10)
In [109]:
# Quando nao houver valores negativos e o range de valores for bem alto, recomenda-se a transformacao via raiz quadrada
sqrt_targets = records.map(lambda r: np.sqrt(float(r[-1]))).collect()
In [110]:
hist(sqrt_targets, bins=40, color='lightblue', normed=True)
Out[110]:
In [111]:
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(16, 10)
In [112]:
data_log = data.map(lambda lp: LabeledPoint(np.log(lp.label),lp.features))
In [113]:
model_log = LinearRegressionWithSGD.train(data_log
,iterations=10
,step=0.1)
In [114]:
true_vs_predicted_log = data_log.map(lambda p: (np.exp(p.label),np.exp(model_log.predict(p.features))))
In [115]:
mse_log = true_vs_predicted_log.map(lambda (t, p): squared_error(t,p)).mean()
mae_log = true_vs_predicted_log.map(lambda (t, p): abs_error(t, p)).mean()
rmsle_log = np.sqrt(true_vs_predicted_log.map(lambda (t, p): squared_log_error(t, p)).mean())
In [116]:
print "Mean Squared Error: %2.4f" % mse_log
print "Mean Absolue Error: %2.4f" % mae_log
print "Root Mean Squared Log Error: %2.4f" % rmsle_log
print "Non log-transformed predictions:\n" + str(true_vs_predicted.take(3))
print "Log-transformed predictions:\n" + str(true_vs_predicted_log.take(3))
In [117]:
data_dt_log = data_dt.map(lambda lp:LabeledPoint(np.log(lp.label), lp.features))
dt_model_log = DecisionTree.trainRegressor(data_dt_log,{})
preds_log = dt_model_log.predict(data_dt_log.map(lambda p:p.features))
actual_log = data_dt_log.map(lambda p: p.label)
true_vs_predicted_dt_log = actual_log.zip(preds_log).map(lambda (t,p): (np.exp(t), np.exp(p)))
mse_log_dt = true_vs_predicted_dt_log.map(lambda (t, p): squared_error(t, p)).mean()
mae_log_dt = true_vs_predicted_dt_log.map(lambda (t, p): abs_error(t,p)).mean()
rmsle_log_dt = np.sqrt(true_vs_predicted_dt_log.map(lambda (t, p):squared_log_error(t, p)).mean())
In [118]:
print "Mean Squared Error: %2.4f" % mse_log_dt
print "Mean Absolue Error: %2.4f" % mae_log_dt
print "Root Mean Squared Log Error: %2.4f" % rmsle_log_dt
print "Non log-transformed predictions:\n" + str(true_vs_predicted_dt.take(3))
print "Log-transformed predictions:\n" + str(true_vs_predicted_dt_log.take(3))
In [119]:
# Atribuicao de index para todos os campos via chave e valor
data_with_idx = data.zipWithIndex().map(lambda (k, v): (v, k))
In [120]:
# Nesse caso a base de teste sera criada usando 20% de todo o conjunto de dados
test = data_with_idx.sample(False, 0.2, 42)
In [121]:
# O subtractByKey() faz o trabalho de remover da base inicial as instancias que tem overlap
# ou seja, esses serao os 80% da base de testes
train = data_with_idx.subtractByKey(test)
In [123]:
train_data = train.map(lambda (idx, p): p)
test_data = test.map(lambda (idx, p) : p)
train_size = train_data.count()
test_size = test_data.count()
In [124]:
print "Training data size: %d" % train_size
print "Test data size: %d" % test_size
print "Total data size: %d " % num_data
print "Train + Test size : %d" % (train_size + test_size)
In [125]:
data_with_idx_dt = data_dt.zipWithIndex().map(lambda (k, v): (v, k))
test_dt = data_with_idx_dt.sample(False, 0.2, 42)
train_dt = data_with_idx_dt.subtractByKey(test_dt)
train_data_dt = train_dt.map(lambda (idx, p): p)
test_data_dt = test_dt.map(lambda (idx, p) : p)
In [126]:
def evaluate(train, test, iterations, step, regParam, regType,intercept):
model = LinearRegressionWithSGD.train(train
,iterations
,step
,regParam=regParam
,regType=regType
,intercept=intercept)
tp = test.map(lambda p: (p.label, model.predict(p.features)))
rmsle = np.sqrt(tp.map(lambda (t, p): squared_log_error(t, p)).mean())
return rmsle
In [159]:
# Numero de interacoes como parametros
params = [1, 5, 10, 20, 50, 100]
metrics = [evaluate(train_data, test_data, param, 0.01, 0.0,'l2',False) for param in params]
print 'Parametros escolhidos:', params
print 'RMSLE dos parametros', metrics
In [157]:
matplotlib.pyplot.plot(params, metrics)
matplotlib.pyplot.show()
In [161]:
# Step size
params = [0.01, 0.025, 0.05, 0.1, 1.0]
metrics = [evaluate(train_data, test_data, 10, param, 0.0, 'l2',False) for param in params]
print 'Parametros escolhidos:', params
print 'RMSLE dos parametros', metrics
In [163]:
# Regularizacao Ridge (L2)
params = [0.0, 0.01, 0.1, 1.0, 5.0, 10.0, 20.0]
metrics = [evaluate(train_data, test_data, 10, 0.1, param, 'l2',False) for param in params]
print 'Parametros escolhidos:', params
print 'RMSLE dos parametros', metrics
In [164]:
matplotlib.pyplot.plot(params, metrics)
matplotlib.pyplot.xscale('log')
matplotlib.pyplot.show()
In [168]:
# Regularizacao L! (Lasso)
params = [0.0, 0.01, 0.1, 1.0, 10.0, 100.0, 1000.0]
metrics = [evaluate(train_data, test_data, 10, 0.1, param, 'l1',False) for param in params]
print 'Parametros escolhidos:', params
print 'RMSLE dos parametros', metrics
In [169]:
matplotlib.pyplot.plot(params, metrics)
matplotlib.pyplot.xscale('log')
matplotlib.pyplot.show()
In [167]:
model_l1 = LinearRegressionWithSGD.train(train_data, 10, 0.1,regParam=1.0, regType='l1', intercept=False)
model_l1_10 = LinearRegressionWithSGD.train(train_data, 10, 0.1,regParam=10.0, regType='l1', intercept=False)
model_l1_100 = LinearRegressionWithSGD.train(train_data, 10, 0.1,regParam=100.0, regType='l1', intercept=False)
In [170]:
print "L1 (1.0) number of zero weights: " + str(sum(model_l1.weights.array == 0))
print "L1 (10.0) number of zeros weights: " + str(sum(model_l1_10.weights.array == 0))
print "L1 (100.0) number of zeros weights: " + str(sum(model_l1_100.weights.array == 0))
In [ ]:
# Como a regularizacao e mais agressiva o numero de numeros zeros e maior quanto mais se aumenta a regularizacao
In [171]:
# Intercepto
params = [False, True]
metrics = [evaluate(train_data, test_data, 10, 0.1, 1.0, 'l2', param)for param in params]
print 'Parametros escolhidos:', params
print 'RMSLE dos parametros', metrics
In [172]:
# Put in bar
matplotlib.pyplot.bar(params, metrics)
matplotlib.pyplot.xscale('log')
matplotlib.pyplot.show()
In [174]:
def evaluate_dt(train, test, maxDepth, maxBins):
model = DecisionTree.trainRegressor(train
, {}
,impurity='variance'
,maxDepth=maxDepth
,maxBins=maxBins)
preds = model.predict(test.map(lambda p: p.features))
actual = test.map(lambda p: p.label)
tp = actual.zip(preds)
rmsle = np.sqrt(tp.map(lambda (t, p): squared_log_error(t,p)).mean())
return rmsle
In [175]:
# Tree Depth
params = [1, 2, 3, 4, 5, 10, 20]
metrics = [evaluate_dt(train_data_dt, test_data_dt, param, 32) for param in params]
print 'Parametros escolhidos:', params
print 'RMSLE dos parametros', metrics
plot(params, metrics)
fig = matplotlib.pyplot.gcf()
In [176]:
# Maximo de bins
params = [2, 4, 8, 16, 32, 64, 100]
metrics = [evaluate_dt(train_data_dt, test_data_dt, 5, param) for param in params]
print 'Parametros escolhidos:', params
print 'RMSLE dos parametros', metrics
plot(params, metrics)
fig = matplotlib.pyplot.gcf()
In [ ]:
# Save and load model
#model.save(sc, "home/myDecisionTreeClassificationModel")
In [ ]:
#sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeClassificationModel")